#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <time.h>
#include <sys/time.h>

#include "definition.h"
#include "epoll_server.h"

#define SERV_IP "127.0.0.1"
#define SERV_PORT 6330
#define US_PER_SECOND 1000*1000
#define MAX_THREAD 8

void stopmain(int signal);
void skippipeerror(int signal);
void writeoutrecords();
void refreshrecords(int serv_port);
int server_fd, efd;
LeonRing leon;

extern uint16_t lenmsgpool[10000];
extern int lenmsgi;
uint32_t num_packet_processed;
double packet_process_speed[10000];
int pktprcsed_i = 0;
int pktprcsed_num = 0;
extern int getpidval;

char rps_outfile[60] = "rps_port.txt"; //
char len_outfile[60] = "len_port.txt";
int filenameid = 0; // number for filename
char *fileprefix = "./";
int num_conn;
pthread_mutex_t conn_num_lock;

#define MAXCLIENTFDNUMBER 2000
int allclientfd[MAXCLIENTFDNUMBER]; // all clientfd saved here, max is
int numclientfd = 0; //

/* for event dispath */
int do_flag[MAX_THREAD];
int num_thread;
static thread_contex *thread_ctx;

#ifdef DISPATCH
static void *dispatch_work_thread(void *arg);
#else
static void *work_thread(void *arg);
#endif
void *log_thread(void *args);

#ifdef DISPATCH
#ifdef MEMPOOL
static mem_pool * mem_pool_init(size_t total_item)
{
	int res;
	mem_pool *mp;
	mp = malloc(sizeof(mem_pool));
	if (mp == NULL) {
		perror("Failed to allocate memory for mem pool");
		exit(EXIT_FAILURE);
	}

	mp->chunk_size =sizeof(eq_item);
	mp->free_chunks = total_item;
	mp->total_chunks = mp->free_chunks;
	res = posix_memalign((void **)&mp->startptr, getpagesize(), total_item*mp->chunk_size);
	if (res != 0) {
		printf("posix_memalign failed, size=%ld\n", total_item);
		//assert(0);
		if (mp) free(mp);
		exit(-1);
	}

	mp->freeptr = (eq_item *)mp->startptr;
	mp->freeptr->next = NULL;
	//mp->freeptr->event.events = 0;
	pthread_mutex_init(&mp->lock, NULL);

	return mp;
}

inline eq_item *get_ptr(mem_pool *mp) {
	pthread_mutex_lock(&mp->lock);
	eq_item *freep = mp->freeptr;
	if (mp->free_chunks == 0) {
		printf("No free chunks in mem pool\n");
		return NULL;
	}
	//printf("%u: Before get, free_chunks: %u\n", pthread_self(), mp->free_chunks);
	mp->free_chunks--;
	if (!freep->next) {
		mp->freeptr = (eq_item *)((u_char *)freep + mp->chunk_size);
		//mp->freeptr->event.events = 0; /* fresh mem */
		mp->freeptr->next = NULL;
	}
	else {
		mp->freeptr = freep->next;
	}
	pthread_mutex_unlock(&mp->lock);
	return freep;
}

inline void free_ptr(mem_pool *mp, eq_item *p)
{
	eq_item * freep = p;
	//freep->event.events = -1; /* recycled mem */
	pthread_mutex_lock(&mp->lock);
	freep->next = mp->freeptr;
	mp->freeptr = freep;
	mp->free_chunks++;
	pthread_mutex_unlock(&mp->lock);
}
#endif

static inline void eq_init(events_queue* eq)
{
	pthread_mutex_init(&eq->lock, NULL);
	eq->head = NULL;
	eq->tail = NULL;
#ifdef MEMPOOL
	eq->event_pool = mem_pool_init(MAX_EVENTS);
#endif
}

static inline void eq_enqueue(events_queue * eq, struct epoll_event *event)
{
	eq_item *item;
#ifdef MEMPOOL
	item = get_ptr(eq->event_pool);
#else
	item = (eq_item *)malloc(sizeof(eq_item));
#endif
	item->event = *event;
	item->next = NULL;
	pthread_mutex_lock(&eq->lock);
	if (NULL == eq->tail)
		eq->head = item;
	else
		eq->tail->next = item;
	eq->tail = item;
	pthread_mutex_unlock(&eq->lock);
}

static inline int eq_dequeue(events_queue * eq, struct epoll_event *event)
{
	eq_item *item;
	pthread_mutex_lock(&eq->lock);
	item = eq->head;

	if (NULL != item) {
		eq->head = item->next;
		if (NULL == eq->head)
			eq->tail = NULL;
		pthread_mutex_unlock(&eq->lock);
		*event = item->event;
#ifdef MEMPOOL
		free_ptr(eq->event_pool, item);
#else
		free(item);
#endif
		return 1;
	}
	pthread_mutex_unlock(&eq->lock);
	return 0;
}
#endif

#ifdef DISPATCH
void inline dispatch_event(struct epoll_event *event)
{
	//eq_item *item = malloc(sizeof(eq_item));
	thread_contex *thread;

	int tid = event->data.fd % num_thread;

	do_flag[tid] = 1;
	thread = thread_ctx + tid;
	eq_enqueue(thread->new_events_queue, event);
	//printf("Dispath event.fd %d to thread %d\n", event.data.fd, thread->thread_id);
}
#else
void inline dispatch_conn(int fd)
{
	struct epoll_event event;
	thread_contex *thread;

	int tid = fd % num_thread;

	do_flag[tid] = 1;
	thread = thread_ctx + tid;

	event.events = EPOLLIN | EPOLLET;
	event.data.fd = fd;
	epoll_ctl(thread->efd, EPOLL_CTL_ADD, fd, &event);
	//printf("Dispath event.fd %d to thread %d\n", event.data.fd, thread->thread_id);
}
#endif

inline void setup_thread(thread_contex * me)
{
#ifdef DISPATCH
	int fds[2];

	if (pipe(fds)) {
		perror("Can't create notify pipe");
		exit(1);
	}
	me->notify_receive_fd = fds[0];
	me->notify_send_fd = fds[1];

    #ifdef DISPATCH
	me->new_events_queue = malloc(sizeof(struct events_queue));
	if (me->new_events_queue == NULL) {
		perror("Failed to allocate memory for event queue");
		exit(EXIT_FAILURE);
	}
	
	eq_init(me->new_events_queue);
	#endif

#ifdef COND_SIG
	if (pthread_mutex_init(&me->lock, NULL)) {
		printf("Failed to create stack singal\n");
		exit(-1);
	}
	if (pthread_cond_init(&me->cond, NULL)) {
		printf("Failed to create log thread\n");
		exit(-1);
	}
#endif
#else
	if ((me->efd = epoll_create(MAX_EVENTS)) == -1) {
		perror("epoll_create()\n");
		exit(-1);
	}
#endif
	me->num_msg = 0;

}

inline void create_worker(void *(*func) (void *), void *arg)
{
	pthread_attr_t attr;
	int ret;

	pthread_attr_init(&attr);

	if ((ret = pthread_create(&((thread_contex *) arg)->thread_id, &attr, func, arg)) != 0) {
		printf("Can't create thread: %s\n", strerror(ret));
		exit(1);
	}

}

void create_thread(int num_thread)
{
	int i = 0;
#ifdef COND_SIG
	printf("Use cond singal\n");
#else
	printf("Use linux pipe\n");
#endif
#ifdef MEMPOOL
	printf("Use mem pool\n");
#endif
	thread_ctx = calloc(MAX_THREAD, sizeof(thread_contex));
	if (!thread_ctx) {
		perror("Can't allocate thread descriptors");
		exit(1);
	}
	for (i = 0; i < num_thread; i++) {
		setup_thread(&thread_ctx[i]);
	}
	/* Create thread_ctx  */
	for (i = 0; i < num_thread; i++) {
#ifdef DISPATCH
		create_worker(dispatch_work_thread, &thread_ctx[i]);
#else
		create_worker(work_thread, &thread_ctx[i]);
#endif
	}
}

#ifdef DISPATCH
static void *dispatch_work_thread(void *arg)
{
	thread_contex *me = arg;
	/* Any per-thread setup can happen here; memcached_thread_init() will block until
	 * all thread_ctx have finished initializing.
	 */
	printf("Dispatch worker: %u start\n", me->thread_id);
	char buf[1];

	struct epoll_event event, tmp_event;
	uint16_t serv_port = SERV_PORT;	// port
	int tempfd;
	while (1) {
#ifdef COND_SIG
		pthread_mutex_lock(&me->lock);
		pthread_cond_wait(&me->cond, &me->lock);
		pthread_mutex_unlock(&me->lock);
#else
		if (read(me->notify_receive_fd, buf, 1) != 1)
			fprintf(stderr, "Can't read from libevent pipe\n");
		//printf("child thread %u, pipe read: %c\n", me->thread_id, buf[0]);
		if (buf[0] == 'c') {
#endif
			while (me->new_events_queue->tail != NULL) {

				struct epoll_event event;
				int ret = eq_dequeue(me->new_events_queue, &event);
				//printf("child thread %u, event.fd: %d\n", pthread_self(), event.data.fd);

				if (ret) {
					if (event.events & EPOLLIN) { // data in
						tempfd = event.data.fd;
						if (tempfd < 0) {
							printf("Why sockfd is less than 0? \n");
							continue;
						}

						if (recvLeonRing(&leon, tempfd) == -2) {
							epoll_ctl(efd, EPOLL_CTL_DEL, tempfd, &tmp_event);
							printf("%d: [recv]close client: %d. \n", getpidval, tempfd);
							pthread_mutex_lock(&conn_num_lock);
							num_conn--;
							pthread_mutex_unlock(&conn_num_lock);
							if (num_conn == 0) {
								//printf("in recv write %s %s ......\n", rps_outfile, len_outfile);
								writeoutrecords();
								refreshrecords(serv_port);
							}
						}
						else if (processLeonRing(&leon, tempfd, &me->num_msg, me) < 0) {
							epoll_ctl(efd, EPOLL_CTL_DEL, tempfd, &tmp_event);
							printf("%d: [send]close client: %d. \n", getpidval, tempfd);
							pthread_mutex_lock(&conn_num_lock);
							num_conn--;
							pthread_mutex_unlock(&conn_num_lock);
							if (num_conn == 0) {
								//printf("in send write %s %s ......\n", rps_outfile, len_outfile);
								writeoutrecords();
								refreshrecords(serv_port);
							}
						}
						else if (lenRingSend(leon.sockfd[tempfd]) > 0) {
							tmp_event.events = EPOLLOUT | EPOLLET;
							tmp_event.data.fd = tempfd;
							epoll_ctl(efd, EPOLL_CTL_MOD, tempfd, &tmp_event);
						}
					}

					else if (event.events & EPOLLOUT) { // data in
						//printf("EPOLLIN -----------\n");
						tempfd = event.data.fd;
						if (tempfd < 0) {
							printf("Why sockfd is 0\n"); continue;
						}

						if (processLeonRing(&leon, tempfd, &me->num_msg, me) < 0) { // return -1 -2
							epoll_ctl(efd, EPOLL_CTL_DEL, tempfd, &tmp_event);
							//printf("Sockfd closed when send......\n");
							printf("%d: [send]close client: %d. \n", getpidval, tempfd);
							pthread_mutex_lock(&conn_num_lock);
							num_conn--;
							pthread_mutex_unlock(&conn_num_lock);
							if (num_conn == 0) {
								//printf("in send write %s %s ......\n", rps_outfile, len_outfile);
								writeoutrecords();
								refreshrecords(serv_port);
							}
						}
						if (lenRingSend(leon.sockfd[tempfd]) == 0){
							tmp_event.events = EPOLLIN | EPOLLET;
							tmp_event.data.fd = tempfd;
							epoll_ctl(efd, EPOLL_CTL_MOD, tempfd, &tmp_event);
						}
					}
				}
			}
#ifndef COND_SIG
		}
#endif
	}
	return NULL;
}

#else
static void *work_thread(void *arg)
{
	thread_contex *me = arg;
	struct epoll_event tmp_event;
	struct epoll_event events[MAX_EVENTS];
	uint16_t serv_port = SERV_PORT;	// port
	int sfd, efd;
	int nevent, i;

	printf("Worker: %u start\n", me->thread_id);
	efd = me->efd;
	while (1) {
		nevent = epoll_wait(efd, events, MAX_EVENTS, -1);
		for(i = 0; i < nevent; i++) {
			if (events[i].events & EPOLLIN) { // data in
				sfd = events[i].data.fd;
				if (sfd < 0) {
					printf("Why sockfd is less than 0? \n");
					continue;
				}

				if (recvLeonRing(&leon, sfd) == -2) {
					epoll_ctl(efd, EPOLL_CTL_DEL, sfd, &tmp_event);
					printf("%d: [recv]close client: %d. \n", getpidval, sfd);
					pthread_mutex_lock(&conn_num_lock);
					num_conn--;
					pthread_mutex_unlock(&conn_num_lock);
					if (num_conn == 0) {
						//printf("in recv write %s %s ......\n", rps_outfile, len_outfile);
						writeoutrecords();
						refreshrecords(serv_port);
					}
				}
				else if (processLeonRing(&leon, sfd, &me->num_msg, me) < 0) {
					epoll_ctl(efd, EPOLL_CTL_DEL, sfd, &tmp_event);
					printf("%d: [send]close client: %d. \n", getpidval, sfd);
					pthread_mutex_lock(&conn_num_lock);
					num_conn--;
					pthread_mutex_unlock(&conn_num_lock);
					if (num_conn == 0) {
						//printf("in send write %s %s ......\n", rps_outfile, len_outfile);
						writeoutrecords();
						refreshrecords(serv_port);
					}
				}
				else if (lenRingSend(leon.sockfd[sfd]) > 0) {
					tmp_event.events = EPOLLOUT | EPOLLET;
					tmp_event.data.fd = sfd;
					epoll_ctl(efd, EPOLL_CTL_MOD, sfd, &tmp_event);
				}
			}

			else if (events[i].events & EPOLLOUT) { // data in
				//printf("EPOLLIN -----------\n");
				sfd = events[i].data.fd;
				if (sfd < 0) {
					printf("Why sockfd is 0\n"); continue;
				}

				if (processLeonRing(&leon, sfd, &me->num_msg, me) < 0) { // return -1 -2
					epoll_ctl(efd, EPOLL_CTL_DEL, sfd, &tmp_event);
					//printf("Sockfd closed when send......\n");
					printf("%d: [send]close client: %d. \n", getpidval, sfd);
					pthread_mutex_lock(&conn_num_lock);
					num_conn--;
					pthread_mutex_unlock(&conn_num_lock);
					if (num_conn == 0) {
						//printf("in send write %s %s ......\n", rps_outfile, len_outfile);
						writeoutrecords();
						refreshrecords(serv_port);
					}
				}
				if (lenRingSend(leon.sockfd[sfd]) == 0){
					tmp_event.events = EPOLLIN | EPOLLET;
					tmp_event.data.fd = sfd;
					epoll_ctl(efd, EPOLL_CTL_MOD, sfd, &tmp_event);
				}
			}
		}
	}
	return NULL;
}
#endif

int main(int argc, char *argv[]) { //
	char helpinfo[] = "Usage: ./epoll_server [-h | -pnf args]\n\n"
					  "Options: \n"
					  "\t-p: port number, default 6330; \n"
					  "\t-n: number of threads, default 8; \n"
					  "\t-f: file path, default ./; \n"
					  "\t-h: print this help info. \n";

	uint16_t serv_port = SERV_PORT;
	char msg_buf[1];
	int i;
	pthread_t pth;

	int optval;
	while ((optval = getopt(argc, argv, "p:n:f:h")) != -1) {
		switch (optval) {
		case 'p': // port number
			serv_port = atoi(optarg);
			break;
		case 'n': // number of threads
			num_thread = atoi(optarg);
			break;
		case 'f': // file path
			fileprefix = optarg;
			break;
		case 'h': // help infomation
			printf("%s", helpinfo);
			exit(-1);
			break;
		case '?': // undefined opt
			break;
		case ':': // no arg
			break;
		default:
			break;
		}
	}

	if (num_thread > MAX_THREAD || num_thread < 1)
		num_thread = MAX_THREAD;

	sprintf(rps_outfile, "%srps_%d_%d.txt", fileprefix, serv_port, filenameid);
	sprintf(len_outfile, "%slen_%d_%d.txt", fileprefix, serv_port, filenameid);


	struct sockaddr_in server_addr, client_addr;
	socklen_t len_cli = sizeof(struct sockaddr);

	struct epoll_event events[MAX_EVENTS], tmp_event;

	signal(SIGINT, stopmain);
	struct sigaction action;
	action.sa_handler = skippipeerror;
	sigemptyset(&action.sa_mask);
	action.sa_flags = 0;
	sigaction(SIGPIPE, &action, NULL);

	getpidval = getpid();

	if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
		printf("%d: socket failed ---------\n", getpidval); exit(-1);
	}

	fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

	server_addr.sin_family = AF_INET;
	server_addr.sin_port = htons(serv_port);
	server_addr.sin_addr.s_addr = INADDR_ANY;

	if (bind(server_fd, (struct sockaddr *)&server_addr, len_cli) == -1) {
		printf("Bind failed\n");
		exit(-1);
	}

	if (listen(server_fd, 256) == -1) {
		printf("Listen failed\n");
		exit(-1);
	}

	if ((efd = epoll_create(MAX_EVENTS)) == -1) {
		printf("epoll_create() failed ---\n");
		exit(-1);
	}

	tmp_event.events = EPOLLIN;
	tmp_event.data.fd = server_fd;

	epoll_ctl(efd, EPOLL_CTL_ADD, server_fd, &tmp_event);

	int num_epwait, tmpi, tempfd;

	initLeonRing(&leon); // init buffer
	memset(allclientfd, 0, sizeof(int)); //

	pthread_mutex_init(&conn_num_lock, NULL); //

	create_thread(num_thread);
	pthread_create(&pth, NULL, log_thread, NULL); // just for count

	while (1) {
		num_epwait = epoll_wait(efd, events, MAX_EVENTS, -1);
		//printf("#### in while loop [%d]\n", num_epwait);
		for (tmpi = 0; tmpi < num_epwait; tmpi++) { // for every new event

			if (events[tmpi].data.fd == server_fd) {
				while ((tempfd = accept(server_fd, (struct sockaddr *)&client_addr,
										   &len_cli)) > 0) {
					printf("%d: new client: %d. \n", getpidval, tempfd); // whether sockfd right?
					fcntl(tempfd, F_SETFL, fcntl(tempfd, F_GETFL, 0) | O_NONBLOCK);
					tmp_event.events = EPOLLIN | EPOLLET;
					tmp_event.data.fd = tempfd;
					allclientfd[numclientfd] = tempfd; //
					numclientfd = (numclientfd + 1) % MAXCLIENTFDNUMBER; //
#ifdef DISPATCH
					epoll_ctl(efd, EPOLL_CTL_ADD, tempfd, &tmp_event);
#else
					dispatch_conn(tempfd);
#endif
					if (num_conn == 0) { // clear files
						fopen(rps_outfile, "w");
						fopen(len_outfile, "w");
					}
					pthread_mutex_lock(&conn_num_lock);
					num_conn++; // connections count ++
					pthread_mutex_unlock(&conn_num_lock);
				}
			}
			else {
#ifdef DISPATCH
				dispatch_event(&events[tmpi]);
#endif
			}
		} // finished for-loop
		//printf("@@@@ end of while-loop................\n");
		//if (--loop < 0) break;
#ifdef DISPATCH
		for (i = 0; i < num_thread; i++) {
			if (do_flag[i] == 1) {
#ifdef COND_SIG
				pthread_mutex_lock(&thread_ctx[i].lock);
				pthread_cond_signal(&thread_ctx[i].cond);
				pthread_mutex_unlock(&thread_ctx[i].lock);
#else
				msg_buf[0] = 'c';
				if (write(thread_ctx[i].notify_send_fd, msg_buf, 1) != 1) {
					printf("Writing to thread notify pipe");
				}
#endif
			}
		}
#endif
	} // finished while-loop

	close(server_fd);
	close(efd);
	pthread_mutex_destroy(&conn_num_lock);
	return 0;
}

void stopmain(int signal) {
	printf("Get singal Ctrl+C\n");

	int tmpi = 0;
	for (; tmpi < MAXCLIENTFDNUMBER; tmpi++) {
		if (allclientfd[tmpi] > 0) close(allclientfd[tmpi]);
	}

	close(server_fd);
	close(efd);
	freeLeonRing(&leon);
	pthread_mutex_destroy(&conn_num_lock);

	writeoutrecords(); // write records

	printf("Close sockets\n");
	_exit(0);
}

void skippipeerror(int signal) {
	printf("**** SIGPIPE ****\n");
}

void writeoutrecords() {
	//*
	FILE *fil = 0;
	FILE *fil1 = 0;
	fil = fopen(len_outfile, "a+");
	if (!fil) {
		printf("Cannot open %s\n", len_outfile);
		goto close;
	}
	int tmpi = 0;
	for (; tmpi < 10000; tmpi++) {
		fprintf(fil, "%u\n", lenmsgpool[tmpi]);
	}
	//printf("\n");//*/

	fil1 = fopen(rps_outfile, "a+");
	if (!fil) {
		printf("Cannot open %s\n", rps_outfile);
		goto close;
	}
	//int tmpi = 0;

	for (tmpi = 0; tmpi < (pktprcsed_num > 10000 ? 10000 : pktprcsed_num); tmpi++) {
		fprintf(fil1, "%lf\n", packet_process_speed[tmpi]);
	}
	fprintf(fil1, "total prcsed: %lu\n", num_packet_processed);

close:
	if (fil)
		fclose(fil1);
	if (fil1)
		fclose(fil);
}

void refreshrecords(int serv_port) {
	filenameid++;
	sprintf(rps_outfile, "%srps_%d_%d.txt", fileprefix, serv_port, filenameid);
	sprintf(len_outfile, "%slen_%d_%d.txt", fileprefix, serv_port, filenameid);
	pktprcsed_i = 0;
	num_packet_processed = 0;
	pktprcsed_num = 0;
	lenmsgi = 0;
	memset(lenmsgpool, 0, 10000 * sizeof(uint16_t)); // init lenmsgpool
}

void *log_thread(void *args) {
	packet_process_speed[0] = 0;
	int preprcsed = num_packet_processed;
	int cur_rps, i;
	//struct timespec pre_time;
	//clock_gettime(CLOCK_MONOTONIC, &pre_time);
	//struct timespec cur_time;
	struct timeval pre_time;
	struct timeval cur_time;
	gettimeofday(&pre_time, NULL);
	//FILE *ouc = fopen("ttt.txt", "w");
	//int getpidval = getpid();
	while (1) {
		sleep(3); // wait for 1 second
		num_packet_processed = 0;
		for (i = 0; i < num_thread; i++) {
			num_packet_processed += thread_ctx[i].num_msg;
		}
		//printf("Log thread: rps: %d\n", num_packet_processed);
		cur_rps = num_packet_processed;
		//cur_time = clock();
		//clock_gettime(CLOCK_MONOTONIC, &cur_time);
		gettimeofday(&cur_time, NULL);
		if (cur_rps == 0)
			preprcsed = 0;
		if (cur_rps > preprcsed) {
			 //packet_process_speed[pktprcsed_i] = (double)(cur_rps - preprcsed);
			int timetime = US_PER_SECOND * (cur_time.tv_sec - pre_time.tv_sec) + cur_time.tv_usec - pre_time.tv_usec;
			 packet_process_speed[pktprcsed_i] = (double)(cur_rps - preprcsed) * US_PER_SECOND / timetime;
			 printf("[Log thread] requests: %d, rps: %f\n", cur_rps - preprcsed, packet_process_speed[pktprcsed_i]);
			 preprcsed = cur_rps;
			 pktprcsed_i = (pktprcsed_i + 1) % 10000;
			 pktprcsed_num++;
		}
		pre_time = cur_time; //
	}
}


